package com.gfscold.trans.common.app;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;

public abstract class BaseSQLApp {
    public abstract void handle(StreamTableEnvironment tableEnv, StreamExecutionEnvironment env, String groupId);

    public void start(int port, int parallelism, String ckAndGroupId) {
        // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
        //TODO 2.开启检查点
        // 2.1 开启 checkpoint
        env.enableCheckpointing(50000);
        // 2.2 设置 checkpoint 模式: 精准一次
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // 2.3 checkpoint 存储
        env.getCheckpointConfig().setCheckpointStorage("file:\\C:\\check");
        // 2.4 checkpoint 并发数
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 2.5 checkpoint 之间的最小间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);
        // 2.6 checkpoint  的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        // 2.7 job 取消时 checkpoint 保留策略
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(RETAIN_ON_CANCELLATION);


        // 2. 执行具体的处理逻辑
        handle(tableEnv, env, ckAndGroupId);
    }
}
